package com.innovation.ic.b1b.monitor.base.handler;

import com.google.common.base.Strings;
import com.innovation.ic.b1b.monitor.base.mapper.ZookeeperAvailableJobLogMapper;
import com.innovation.ic.b1b.monitor.base.mapper.ZookeeperAvailableJobMapper;
import com.innovation.ic.b1b.monitor.base.mapper.ZookeeperMapper;
import com.innovation.ic.b1b.monitor.base.model.Zookeeper;
import com.innovation.ic.b1b.monitor.base.model.ZookeeperAvailableJob;
import com.innovation.ic.b1b.monitor.base.model.ZookeeperAvailableJobLog;
import com.innovation.ic.b1b.monitor.base.pojo.constant.JobDataMapConstant;
import com.innovation.ic.b1b.monitor.base.pojo.enums.ActiveEnum;
import com.innovation.ic.b1b.monitor.base.value.TimeSetConfig;
import lombok.SneakyThrows;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
import java.io.IOException;
import java.sql.Date;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

/**
 * @desc   zookeeper可用任务处理类
 * @author linuo
 * @time   2023年5月11日13:23:27
 */
public class ZookeeperAvailableJobHandler extends QuartzJobBean {
    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Resource
    private ZookeeperAvailableJobLogMapper zookeeperAvailableJobLogMapper;

    @Resource
    private ZookeeperAvailableJobMapper zookeeperAvailableJobMapper;

    @Resource
    private ZookeeperMapper zookeeperMapper;

    @Resource
    private HandlerHelper handlerHelper;

    @Resource
    private TimeSetConfig timeSetConfig;

    @SneakyThrows
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) {
        log.info("开始执行zookeeper可用任务处理");

        // 获取1-5间的随机数
        int min = 1;
        int max = 5;
        Random random = new Random();
        int s = random.nextInt(max) % (max - min + 1) + min;
        int sleepTime = s * 1000;
        log.info("休眠[{}]秒", s);
        Thread.sleep(sleepTime);

        JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
        int id = jobDataMap.getInt(JobDataMapConstant.ID);
        ZookeeperAvailableJob zookeeperAvailableJob = zookeeperAvailableJobMapper.selectById(id);
        if(zookeeperAvailableJob != null) {
            Zookeeper zookeeper = zookeeperMapper.selectById(zookeeperAvailableJob.getZookeeperId());
            if (zookeeper != null) {
                String zookeeperName = zookeeper.getName();

                // 存活状态
                Integer active = ActiveEnum.NO.getCode();

                ZookeeperAvailableJobLog zookeeperAvailableJobLog = new ZookeeperAvailableJobLog();
                zookeeperAvailableJobLog.setZookeeperAvailableJobId(id);
                zookeeperAvailableJobLog.setStartTime(new Date(System.currentTimeMillis()));

                if(Strings.isNullOrEmpty(zookeeper.getIp()) || Strings.isNullOrEmpty(zookeeper.getPort())){
                    String message = zookeeperName + "配置信息不完整,无法检测是否存活";
                    log.info(message);
                    zookeeperAvailableJobLog.setDescription(message);
                }else{
                    // 判断zookeeper是否可用
                    String message = judgeZookeeperIfActive(zookeeper);
                    if(message == null){
                        active = ActiveEnum.YES.getCode();
                    }else{
                        zookeeperAvailableJobLog.setDescription(message);
                    }
                }

                zookeeperAvailableJobLog.setActive(active);
                int insert = zookeeperAvailableJobLogMapper.insert(zookeeperAvailableJobLog);
                if(insert > 0){
                    log.info("zookeeper可用任务日志插入成功");

                    if(active.intValue() == ActiveEnum.NO.getCode().intValue()){
                        // 发送邮件
                        String message = zookeeperName + "出现异常，原因：" + zookeeperAvailableJobLog.getDescription() + "，请查看zookeeper状态并及时进行异常处理。";
                        handlerHelper.sendEmail(zookeeperAvailableJob.getAlarmEmail(), message, null);
                    }
                }
            }else{
                log.info("未在zookeeper表中查询到id=[{}]的配置信息,无法执行监控zookeeper可用任务", zookeeperAvailableJob.getZookeeperId());
            }
        }else{
            log.info("监控zookeeper可用任务id为[{}]的数据不存在,无法执行任务", id);
        }
    }

    /**
     * 判断zookeeper是否可用
     * @param zookeeper zookeeper配置信息
     * @return 返回判断结果
     */
    private String judgeZookeeperIfActive(Zookeeper zookeeper) throws IOException, InterruptedException {
        String message = null;

        int sessionTimeout = 5000;
        String server = zookeeper.getIp() + ":" + zookeeper.getPort();

        // 信号量，阻塞程序执行，用于等待zookeeper连接成功，发送成功信号
        final CountDownLatch connectedSemaphore = new CountDownLatch(1);

        try {
            log.info("第一次执行监控zookeeper是否可用任务");
            ZooKeeper zooKeeper = new ZooKeeper(server, sessionTimeout, event -> {
                //获取事件的状态
                Watcher.Event.KeeperState keeperState = event.getState();
                Watcher.Event.EventType eventType = event.getType();
                //如果是建立连接
                if(Watcher.Event.KeeperState.SyncConnected == keeperState){
                    if(Watcher.Event.EventType.None == eventType){
                        //如果建立连接成功，则发送信号量，让后续阻塞程序向下执行
                        connectedSemaphore.countDown();
                        log.info("zookeeper建立连接成功");
                    }
                }
            });

            Thread.sleep(2000);

            boolean alive = zooKeeper.getState().isAlive();
            log.info("alive => {}", alive);
            boolean connected = zooKeeper.getState().isConnected();
            log.info("connected => {}", connected);
            if(connected){
                zooKeeper.close();
            }else{
                zooKeeper.close();
                throw new Exception("zookeeper连接失败");
            }
        }catch (Exception e){
            log.warn("监控zookeeper是否可用任务出现问题,原因:[{}],待{}秒后重试", e.toString(), timeSetConfig.getRetryWaitTime());
            Thread.sleep(timeSetConfig.getRetryWaitTime() * 1000);

            try {
                log.info("第二次执行监控zookeeper是否可用任务");
                ZooKeeper zooKeeper = new ZooKeeper(server, sessionTimeout, event -> {
                    //获取事件的状态
                    Watcher.Event.KeeperState keeperState = event.getState();
                    Watcher.Event.EventType eventType = event.getType();
                    //如果是建立连接
                    if(Watcher.Event.KeeperState.SyncConnected == keeperState){
                        if(Watcher.Event.EventType.None == eventType){
                            //如果建立连接成功，则发送信号量，让后续阻塞程序向下执行
                            connectedSemaphore.countDown();
                            log.info("zookeeper建立连接成功");
                        }
                    }
                });

                Thread.sleep(2000);

                boolean alive = zooKeeper.getState().isAlive();
                log.info("alive => {}", alive);
                boolean connected = zooKeeper.getState().isConnected();
                log.info("connected => {}", connected);
                if(connected){
                    zooKeeper.close();
                    log.info("zookeeper连接成功");
                }else{
                    zooKeeper.close();
                    throw new Exception("zookeeper连接失败");
                }
            }catch (Exception e1) {
                log.warn("监控zookeeper是否可用任务出现问题,原因:[{}],待{}秒后重试", e.toString(), timeSetConfig.getRetryWaitTime());
                Thread.sleep(timeSetConfig.getRetryWaitTime() * 1000);

                try {
                    log.info("第三次执行监控zookeeper是否可用任务");
                    ZooKeeper zooKeeper = new ZooKeeper(server, sessionTimeout, event -> {
                        //获取事件的状态
                        Watcher.Event.KeeperState keeperState = event.getState();
                        Watcher.Event.EventType eventType = event.getType();
                        //如果是建立连接
                        if(Watcher.Event.KeeperState.SyncConnected == keeperState){
                            if(Watcher.Event.EventType.None == eventType){
                                //如果建立连接成功，则发送信号量，让后续阻塞程序向下执行
                                connectedSemaphore.countDown();
                                log.info("zookeeper建立连接成功");
                            }
                        }
                    });

                    Thread.sleep(2000);

                    boolean alive = zooKeeper.getState().isAlive();
                    log.info("alive => {}", alive);
                    boolean connected = zooKeeper.getState().isConnected();
                    log.info("connected => {}", connected);
                    if(connected){
                        zooKeeper.close();
                        log.info("zookeeper连接成功");
                    }else{
                        zooKeeper.close();
                        throw new Exception("zookeeper连接失败");
                    }
                }catch (Exception e2) {
                    log.warn("监控zookeeper是否可用任务出现问题,原因:[{}],zookeeper连接失败", e.toString());
                    message = e2.toString();
                }
            }
        }

        if(!Strings.isNullOrEmpty(message)){
            if(message.length() > 100){
                message = message.substring(0, 100);
            }
        }

        return message;
    }
}